package com.ludan.kafka3.demo;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.HashMap;
import java.util.Map;

public class JoinDemo {
    public static void main(String[] args) {

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> left = builder.stream("intpu-left");
        KStream<String, String> right = builder.stream("intpu-right");

        KStream<String, String> join = left.selectKey((key, value) -> value.split(",")[1])
                .join(right.selectKey((key, value) -> value.split(",")[0]), new ValueJoiner<String, String, String>() {
                    @Override
                    public String apply(String value1, String value2) {
                        return value1 + "--" + value2;
                    }
                }, JoinWindows.of(30000));

    }
}
